/*
 * MtcpMediaSource.cpp
 *
 *  Created on: 2016年6月23日
 *      Author: terry
 */

#include "MtcpMediaSource.h"
#include "URI.h"
#include "MediaTransportHelper.h"
#include <json/writer.h>
#include "TStringUtil.h"
#include "CLog.h"


extern "C"
{
#include "libavutil/time.h"
};



namespace av
{

MtcpMediaSource::MtcpMediaSource():
    m_url(),
    m_params(),
    m_format(),
    m_state(),
    m_curTime(),
    m_timeout(1000*10)
{
    m_buffer.ensure(1024 * 500);
}

MtcpMediaSource::~MtcpMediaSource()
{
    close();
}

int MtcpMediaSource::open(const std::string& url, const std::string& params)
{
    m_url = url;
    m_params = params;
    m_paramSet.fromString(params, util::ParamSet::SEPERATOR, util::ParamSet::SEG);

    util::Uri uri;
    if (!uri.parse(url))
    {
        return EINVAL;
    }

    if (!m_socket.open(SOCK_STREAM))
    {
        return ENODEV;
    }

    m_sockPair.open();
    
    m_socket.setReuse();

#ifdef ANDROID

#else
    int value = true;
    int ret = ::setsockopt(m_socket.getHandle(), IPPROTO_TCP, TCP_NODELAY, (char*)&value, sizeof(value));
#endif //

    if (m_paramSet.exists("buffer_size"))
    {
        int bufSize = 0;
        m_paramSet.get("buffer_size", bufSize);
        if (bufSize > 0)
        {
            m_socket.setRecvBufferSize(bufSize);
        }
    }
    
    m_socket.setNonblock(true);
    
    m_socket.connect(uri.m_host.c_str(), uri.m_port);

    if (!checkConnected(m_timeout))
    {
        close();
        return EACCES;
    }

    m_socket.setKeepAlive(true, 2, 2, 2);

    comn::StringUtil::trim(uri.m_path, "/");

    if (!doPlay(uri.m_path))
    {
        close();
        return ENOENT;
    }

    return 0;
}

void MtcpMediaSource::close()
{
    m_socket.close();
    m_sockPair.close();
}

bool MtcpMediaSource::isOpen()
{
    return m_socket.isOpen();
}

bool MtcpMediaSource::getFormat(MediaFormat& fmt)
{
    fmt = m_format;
    return m_format.isValid();
}

int MtcpMediaSource::getDuration()
{
    return m_format.m_duration;
}

int MtcpMediaSource::play()
{
    m_state = STATE_PLAYING;
    return 0;
}

int MtcpMediaSource::pause()
{
    m_state = STATE_PAUSED;
    return 0;
}

void MtcpMediaSource::stop()
{
    // pass
    m_state = STATE_STOPPED;
}

int MtcpMediaSource::getState()
{
    return m_state;
}

bool MtcpMediaSource::seekable()
{
    return false;
}

int MtcpMediaSource::seek(int64_t offset)
{
    return ENOSYS;
}

int64_t MtcpMediaSource::getTime()
{
    return m_curTime;
}

int MtcpMediaSource::setScale(float scale)
{
    return ENOSYS;
}

float MtcpMediaSource::getScale()
{
    return 1.0;
}

int MtcpMediaSource::read(AVPacket& pkt)
{
    if (!checkReadable(500))
    {
        return AVERROR(EAGAIN);
    }
    
    if (!doReadFrame())
    {
        if (m_socket.isOpen())
        {
            return AVERROR(EAGAIN);
        }
        else
        {
            return AVERROR(EACCES);
        }
    }

    MediaTransportHeader* header = (MediaTransportHeader*)m_buffer.data();

    uint8_t* data = m_buffer.data() + sizeof(MediaTransportHeader);
    int size = m_buffer.size() - sizeof(MediaTransportHeader);
    int64_t pts = header->timestamp;
    
    av_init_packet(&pkt);
    av_new_packet(&pkt, size);

    memcpy(pkt.data, data, pkt.size);

    pkt.stream_index = header->subtype;
    pkt.pts = pts;
    pkt.dts = pts;
    pkt.duration = header->duration;
    pkt.flags = header->flags;

    if (header->subtype == MEDIA_TYPE_VIDEO)
    {
        AVRational src = av_make_q(1, m_format.m_clockRate);
        AVRational dst = av_make_q(1, AV_TIME_BASE);
        if ((m_format.m_clockRate > 0) && (m_format.m_clockRate != AV_TIME_BASE))
        {
            av_packet_rescale_ts(&pkt, src, dst);
        }
    }
    else if (header->subtype == MEDIA_TYPE_AUDIO)
    {
        AVRational src = av_make_q(1, m_format.m_audioRate);
        AVRational dst = av_make_q(1, AV_TIME_BASE);
        if ((m_format.m_audioRate > 0) && (m_format.m_audioRate != AV_TIME_BASE))
        {
            av_packet_rescale_ts(&pkt, src, dst);
        }
    }

    pkt.pts = pkt.dts = av_gettime_relative();

    //CLog::debug("video pkt. pts:%I64d, size:%d\n", pkt.dts / 1000,  pkt.size);

    m_buffer.resize(0);

    m_curTime = pkt.pts;

    return 0;
}

void MtcpMediaSource::interrupt()
{
    m_sockPair.makeReadable();
}

bool MtcpMediaSource::isLive()
{
    return true;
}

bool MtcpMediaSource::checkReadable(long millsec)
{
    fd_set rset;
    FD_ZERO(&rset);
    FD_SET(m_socket.getHandle(), &rset);
    FD_SET(m_sockPair.getHandle(), &rset);

    socket_t maxSock = std::max(m_socket.getHandle(), m_sockPair.getHandle());

    timeval* ptv = NULL;
    timeval tv = { millsec / 1000, (millsec % 1000) * 1000 };
    ptv = (millsec >= 0) ? (&tv) : NULL;

    int ret = select((int)maxSock + 1, &rset, NULL, NULL, ptv);
    if (ret > 0)
    {
        if (FD_ISSET(m_socket.getHandle(), &rset))
        {
            return true;
        }
    }

    return false;
}

bool MtcpMediaSource::checkConnected(long millsec)
{
    fd_set wset;
    FD_ZERO(&wset);
    FD_SET(m_socket.getHandle(), &wset);

    fd_set rset;
    FD_ZERO(&rset);
    FD_SET(m_sockPair.getHandle(), &rset);

    socket_t maxSock = std::max(m_socket.getHandle(), m_sockPair.getHandle());

    timeval* ptv = NULL;
    timeval tv = { millsec / 1000, (millsec % 1000) * 1000 };
    ptv = (millsec >= 0) ? (&tv) : NULL;

    int ret = select((int)maxSock + 1, &rset, &wset, NULL, ptv);
    if (ret > 0)
    {
        if (FD_ISSET(m_socket.getHandle(), &wset))
        {
            return true;
        }
    }

    return false;
}

bool MtcpMediaSource::doPlay(const std::string& name)
{
    av::MediaTransportHeader hdr;    
    av::MediaTransport::setCommand(hdr, av::kMediaCmdPlay);

    Json::Value req;
    req["name"] = name;
    
    std::string text = req.toStyledString();

    hdr.length = text.size();

    m_socket.send((char*)&hdr, sizeof(hdr), 0);
    int ret = m_socket.send(text.c_str(), text.size(), 0);
    if (ret != text.size())
    {
        return false;
    }

    if (!checkReadable(m_timeout))
    {
        return false;
    }

    int length = m_socket.receive((char*)&hdr, sizeof(hdr), 0);
    if (length != sizeof(hdr))
    {
        return false;
    }

    comn::ByteBuffer buffer;
    buffer.ensure(hdr.length);

    length = m_socket.receive((char*)buffer.data(), hdr.length, 0);
    if (length != hdr.length)
    {
        return false;
    }

    Json::Value value;
    Json::Reader reader;
    if (!reader.parse(buffer.c_str(), buffer.c_str() + hdr.length, value))
    {
        return false;
    }

    MediaTransportHelper::fromJson(m_format, value);
    //m_format.m_clockRate = 1000000;
    //m_format.m_audioRate = 1000000;

    return true;
}

bool MtcpMediaSource::doReadFrame()
{
    size_t bufSize = m_buffer.size();
    if (bufSize < sizeof(av::MediaTransportHeader))
    {
        char* buf = (char*)m_buffer.data() + bufSize;
        int size = sizeof(av::MediaTransportHeader) - m_buffer.size();
        int length = m_socket.receive(buf, size, 0);
        if (length < 0)
        {
            if (m_socket.checkReadAndClose(0) < 0)
            {
                m_socket.close();
                return false;
            }

            return false;
        }
        else if (length == 0)
        {
            m_socket.close();
            return false;
        }

        m_buffer.resize(bufSize + length);

        if (length != size)
        {
            return false;
        }
    }

    av::MediaTransportHeader* header = (av::MediaTransportHeader*)m_buffer.data();

    assert(header->magic == av::MediaTransport::MAGIC);
    

    m_buffer.ensure(header->length + sizeof(av::MediaTransportHeader));
    header = (av::MediaTransportHeader*)m_buffer.data();

    char* buf = (char*)m_buffer.data() + m_buffer.size();
    int size = header->length + sizeof(av::MediaTransportHeader) - m_buffer.size();

    int length = m_socket.receive(buf, size, 0);
    if (length < 0)
    {
        return false;
    }
    else if (length == 0)
    {
        m_socket.close();
        return false;
    }

    m_buffer.resize(m_buffer.size() + length);
    if (length != size)
    {
        return false;
    }

    return true;
}



} /* namespace av */
